Crate async_event_streams
source · [−]Expand description
Library for publishing events for multiple consumers using asynchromous streams
Library provides EventStreams<T: ’static + Send + Sync> object which translates events of type T
to arbitrary number of EventStream objects, which implements standard [futures::Stream] interface
Usage sample
use futures::{executor::LocalPool, task::LocalSpawnExt, StreamExt};
use async_event_streams::EventStreams;
let mut pool = LocalPool::new();
let streams = EventStreams::new();
let mut stream = streams.create_event_stream();
let sender_task = async move {
assert!(streams.count() == 1);
streams.send_event(42, None).await;
streams.send_event(451, None).await;
streams.send_event(1984, None).await;
};
let receiver_task = async move {
let mut values = Vec::new();
while let Some(event) = stream.next().await {
values.push(*event);
}
// next() returns none when 'streams' is dropped
assert!(values == vec![42, 451, 1984]);
};
pool.spawner().spawn_local(sender_task);
pool.spawner().spawn_local(receiver_task);
pool.run();
Event processing order
When event is put to EventStreams it becomes immediately available for all EventStream objects, created by this EventStreams
.
Events comes from each stream exactly in order as they being sent.
Since reveivers work in asynchronous environment it’s possible that streams are emptied unevenly. I.e. if events 1,2,3,4,5 put to EventStreams, one EventStream subscriber could process all 5 events while another is still waiting for first.
Sometimes it’s undesirable. So the mechanism to guarantee that all events ‘1’ are handled before sending event ‘2’ is implemented.
To achieve this the send_event function returns future SentEvent. Each EventStream instance receives clone
of Event<T> object which all wraps the same instance of event. Subscribers get these Event
instances and
may hold them as long as they need it. SentEvent
future is released only when all instances of this
Event
are dropped. This guarantees that next event is sent only when previous one has been processed by all subscribers.
If such blocking is not necessary, the post_event can be used instead.
Dependent Events
Received events may cause firing new events. For example mouse button click handler is sending mouse click events. These clicks causes GUI buttons to send button press events. It may be important to guarantee that button press events are not handled in order different than mouse clicks order.
For example consider two buttons A and B. Click C1 causes button A send press
P1, click C2 causes button B send press P2. It’s guaranteed that P2 is sent after P1 (because P1 is reaction to C1,
P2 is reaction to C2, and both C1 and C2 comes from same send_event
). But there is still no guarantee that P2 is processed after P1,
because P1 and P2 are sent by different send_event
s so the blocking mechanism decribed above doesn’t help.
This may cause problems. For example: user clicks “Apply” button and then “Close” button in the dialog. But press event from “Close” button comes earlier than from “Apply”. “Close” handler destroys the dialog, “Apply” is not processed, user’s data is lost.
To avoid this the send_event and post_event have
the additional optional parameter source
- event which was the cause of the sent one. Reference to this ‘source’ event is saved
inside Event wrapper of new event and therefore source send_event
is blocked until all derived events are dropped.
So sending second click event in example above is delayed until “Apply” handler (which holds first click event) finishes.
Event sources, sinks and pipes
There are typical repeating operations with event streams. Object may generate events of different types (EventSource) and react to events (EventSink). Connecting event source to event sink can be performed by spawing asynchronous task with spawn_event_pipe
Structs
Reference-counting container with event. Each crate::EventStream instance receives clone of Event<T>
referencing the same instance of T
.
When all instances of Event<T>
are dropped, the crate::SentEvent future returned by send_event is released.
Clonable container with instance of event which hides type of event.
The Event object is just wrapper over Arc<EventBox>
with type
of event specified
Asychronous stream of events of specified type. The stream’s next()
method returns Some(Event<EVT>)
while
source object crate::EventStreams is alive and None
when it is destroyed.
Main object which allows to send events and subscribe to them
Future returned by send_event. await
on it blocks until all instances of crate::Event sent to EventStream
by this send_event
are dropped
Traits
Standartized interface for object reacting to events of specific type. The trait have two methods: on_event_owned
which accepts
event object and on_event_ref
, accepting borrowed reference to event. It’s supposed that both bethods should work identically.
But sometimes if it is necessary to retranslate the event received. So it is effective to handle owned event case separately from borrowed.
If the event object implements ToOwned trait (note that all Clone object implements it), EventSink implementation can be simplified by implementing helper EventSinkExt with only one event handler accepting std::borrow::Cow parameter, instead of separate handlers for owned and borrowed cases
Standartized interface for structures providing sream of events of specified type.
Functions
Connect EventSource to EventSink: run asynchronous task which reads events from source and calls EventSink::on_event_ref on sink object. Source may provide events for multiple readers, so only references to events are available from it.
Same as spawn_event_pipe, but also returns handle to task spawned by [futures::task::SpawnExt::spawn_with_handle]